/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.server.master.runner;

import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.*;
import cn.escheduler.common.graph.DAG;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.model.TaskNodeRelation;
import cn.escheduler.common.process.ProcessDag;
import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.*;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.utils.DagHelper;
import cn.escheduler.server.utils.AlertManager;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static cn.escheduler.common.Constants.*;

/**
 * master exec thread,split dag
 */
public class MasterExecThread implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(MasterExecThread.class);

    /**
     * process instance
     */
    private ProcessInstance processInstance;

    /**
     *  runing TaskNode
     */
    private final Map<MasterBaseTaskExecThread, Future<Boolean>> activeTaskNode = new ConcurrentHashMap<>();

    private final ExecutorService taskExecService;

    /**
     * submit failure nodes
     */
    private Boolean taskFailedSubmit = false;
    // 流程定义中恢复执行的任务集合
    private List<TaskInstance> recoverNodeIdList = new ArrayList<>();
    // 执行错误的任务集合
    private Map<String,TaskInstance> errorTaskList = new ConcurrentHashMap<>();
    // 执行完成的任务集合(包括成功和失败的任务)
    private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
    // 准备执行的任务集合
    private Map<String, TaskInstance> readyToSubmitTaskList = new ConcurrentHashMap<>();
    // 依赖任务执行失败集合(如果依赖任务执行失败，则当前任务不提交，状态变为failure)
    private Map<String, TaskInstance> dependFailedTask = new ConcurrentHashMap<>();
    // 流程定义中禁止的任务集合
    private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>();
    // 需要容错能力的任务集合(节点故障导致的失败任务)
    private List<TaskInstance> recoverToleranceFaultTaskList = new ArrayList<>();

    private AlertManager alertManager = new AlertManager();

    private DAG<String,TaskNode,TaskNodeRelation> dag;

    /**
     *  process dao
     */
    private ProcessDao processDao;

    /**
     * load configuration file
     */
    private static Configuration conf;

    public MasterExecThread(ProcessInstance processInstance) {
        this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
        this.processInstance = processInstance;
        int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS, Constants.defaultMasterTaskExecNum);
        this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread", masterTaskExecNum);
    }

    static {
        try {
            conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH);
        }catch (ConfigurationException e){
            logger.error("load configuration failed : " + e.getMessage(),e);
            System.exit(1);
        }
    }

    @Override
    public void run() {
        // process instance is null
        if (processInstance == null) {
            logger.info("process instance is not exists");
            return;
        }

        // check to see if it's done
        if (processInstance.getState().typeIsFinished()) {
            logger.info("process instance is done : {}", processInstance.getId());
            return;
        }

        try {
            if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()) {
                // sub process complement data
                executeComplementProcess();
            } else {
                // execute flow
                executeProcess();
            }
        } catch (Exception e) {
            logger.error("master exec thread exception: " + e.getMessage(), e);
            logger.error("process execute failed, process id:{}", processInstance.getId());
            processInstance.setState(ExecutionStatus.FAILURE);
            processInstance.setEndTime(new Date());
            processDao.updateProcessInstance(processInstance);
        } finally {
            taskExecService.shutdown();
            // post handle
            postHandle();
        }
    }

    private void executeProcess() throws Exception {
        prepareProcess();
        runProcess();
        endProcess();
    }

    /**
     *  execute complement process
     * @throws Exception
     */
    private void executeComplementProcess() throws Exception {
        Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());

        Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
        Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
        processDao.saveProcessInstance(processInstance);
        Date scheduleDate = processInstance.getScheduleTime();

        if (scheduleDate == null) {
            scheduleDate = startDate;
        }

        while (Stopper.isRunning()) {
            // prepare dag and other info
            prepareProcess();

            if (dag == null) {
                logger.error("process {} dag is null, please check out parameters", processInstance.getId());
                processInstance.setState(ExecutionStatus.SUCCESS);
                processDao.updateProcessInstance(processInstance);
                return;
            }

            // execute process, waiting for end
            runProcess();

            // process instance failure，no more complements
            if (!processInstance.getState().typeIsSuccess()) {
                logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState());
                break;
            }

            // current process instance success，next execute
            scheduleDate = DateUtils.getSomeDay(scheduleDate, 1);
            if(scheduleDate.after(endDate)){
                // all success
                logger.info("process {} complement completely!", processInstance.getId());
                break;
            }

            logger.info("process {} start to complement {} data", processInstance.getId(), DateUtils.dateToString(scheduleDate));
            // execute next process instance complement data
            processInstance.setScheduleTime(scheduleDate);

            // 恢复执行的任务只需要完成一次执行，所以移除
            if (cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)) {
                cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
                processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
            }

            List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
            for(TaskInstance taskInstance : taskInstanceList){
                taskInstance.setFlag(Flag.NO);
                processDao.updateTaskInstance(taskInstance);
            }
            processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
            processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
                    processInstance.getProcessDefinition().getGlobalParamMap(),
                    processInstance.getProcessDefinition().getGlobalParamList(),
                    CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
            processDao.saveProcessInstance(processInstance);
        }

        // flow end
        endProcess();
    }

    /**
     * prepare process parameter
     * @throws Exception
     */
    private void prepareProcess() throws Exception {
        // init task queue
        initTaskQueue();

        // gen process dag
        buildFlowDag();
        logger.info("prepare process :{} end", processInstance.getId());
    }

    /**
     * process end handle
     */
    private void endProcess() {
        processInstance.setEndTime(new Date());
        processDao.updateProcessInstance(processInstance);
        if(processInstance.getState().typeIsWaittingThread()){
            processDao.createRecoveryWaitingThreadCommand(null, processInstance);
        }
        List<TaskInstance> taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId());
        alertManager.sendAlertProcessInstance(processInstance, taskInstances);
    }

    /**
     *  generate process dag
     * @throws Exception
     */
    private void buildFlowDag() throws Exception {
        // 开始任务节点
        recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
        // 禁止任务节点
        forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
        // generate process to get DAG info
        List<String> recoveryNameList = getRecoveryNodeNameList();
        List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
        ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(),
                startNodeNameList, recoveryNameList, processInstance.getTaskDependType());
        if(processDag == null){
            //TODO...
            logger.error("processDag is null");
            return;
        }
        // generate process dag
        dag = DagHelper.buildDagGraph(processDag);
    }

    private void initTaskQueue(){
        taskFailedSubmit = false;
        activeTaskNode.clear();
        dependFailedTask.clear();
        completeTaskList.clear();
        errorTaskList.clear();
        List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
        for(TaskInstance task : taskInstanceList){
            if(task.isTaskComplete()){
                completeTaskList.put(task.getName(), task);
            }
            if(task.getState().typeIsFailure() && !task.taskCanRetry()){
                errorTaskList.put(task.getName(), task);
            }
        }
    }

    /**
     * process post handle
     */
    private void postHandle() {
        logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
        if (!CommonUtils.isDevelopMode()) {
            // get exec dir
            String execLocalPath = cn.escheduler.common.utils.FileUtils
                    .getProcessExecDir(processInstance.getProcessDefinition().getProjectId(),
                            processInstance.getProcessDefinitionId(), processInstance.getId());
            try {
                FileUtils.deleteDirectory(new File(execLocalPath));
            } catch (IOException e) {
                logger.error("delete exec dir failed : " + e.getMessage(), e);
            }
        }
    }

    /**
     * submit task to execute
     * @param taskInstance
     */
    private TaskInstance submitTaskExec(TaskInstance taskInstance) {
        MasterBaseTaskExecThread abstractExecThread = null;
        if (taskInstance.isSubProcess()) {
            abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance);
        } else {
            abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance);
        }
        Future<Boolean> future = taskExecService.submit(abstractExecThread);
        activeTaskNode.putIfAbsent(abstractExecThread, future);
        return abstractExecThread.getTaskInstance();
    }

    /**
     * find task instance in db.
     * in case submit more than one same name task in the same time.
     * @param taskName
     * @return
     */
    private TaskInstance findTaskIfExists(String taskName) {
        List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(this.processInstance.getId());
        for (TaskInstance taskInstance : taskInstanceList) {
            if (taskInstance.getName().equals(taskName)) {
                return taskInstance;
            }
        }
        return null;
    }

    /**
     * encapsulation / ɪnˌkæpsjuˈleɪʃ(ə)n / task 封装任务
     * @param processInstance
     * @param nodeName
     * @return
     */
    private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
                                            TaskNode taskNode, String parentNodeName) {
        TaskInstance taskInstance = findTaskIfExists(nodeName);
        if (taskInstance == null) {
            taskInstance = new TaskInstance();
            // task name
            taskInstance.setName(nodeName);
            // process instance define id
            taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
            // task instance state
            taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
            // process instance id
            taskInstance.setProcessInstanceId(processInstance.getId());
            // task instance node json
            taskInstance.setTaskJson(JSONObject.toJSONString(taskNode));
            // task instance type
            taskInstance.setTaskType(taskNode.getType());
            // task instance whether alert
            taskInstance.setAlertFlag(Flag.NO);

            // task instance start time
            taskInstance.setStartTime(new Date());

            // task instance flag
            taskInstance.setFlag(Flag.YES);

            // task instance retry times
            taskInstance.setRetryTimes(0);

            // max task instance retry times
            taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes());

            // retry task instance interval
            taskInstance.setRetryInterval(taskNode.getRetryInterval());

            // task instance priority
            if (taskNode.getTaskInstancePriority() == null) {
                taskInstance.setTaskInstancePriority(Priority.MEDIUM);
            } else {
                taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
            }

            int workerGroupId = taskNode.getWorkerGroupId();
            taskInstance.setWorkerGroupId(workerGroupId);
        }
        return taskInstance;
    }

    /**
     * get post task instance by node
     *
     * @param dag
     * @param parentNodeName
     * @return
     */
    private List<TaskInstance> getPostTaskInstanceByNode (
            DAG<String, TaskNode, TaskNodeRelation> dag, String parentNodeName) {
        List<TaskInstance> postTaskList = new ArrayList<>();

        Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList);
        if (startVertex == null) {
            return postTaskList;
        }

        for (String nodeName : startVertex) {
            // encapsulation task instance 封装任务实例
            TaskInstance taskInstance = createTaskInstance(processInstance, nodeName, dag.getNode(nodeName), parentNodeName);
            postTaskList.add(taskInstance);
        }
        return postTaskList;
    }

    /**
     * return start task node list
     * @return
     */
    private List<TaskInstance> getStartSubmitTaskList() {
        List<TaskInstance> startTaskList = getPostTaskInstanceByNode(dag, null);
        HashMap<String, TaskInstance> successTaskMaps = new HashMap<>();
        List<TaskInstance> resultList = new ArrayList<>();
        while (Stopper.isRunning()) {
            for (TaskInstance task : startTaskList) {
                if (task.getState().typeIsSuccess()) {
                    successTaskMaps.put(task.getName(), task);
                } else if (!completeTaskList.containsKey(task.getName()) && !errorTaskList.containsKey(task.getName())) {
                    resultList.add(task);
                }
            }
            startTaskList.clear();
            if (successTaskMaps.size() == 0) {
                break;
            }

            Set<String> taskNameKeys = successTaskMaps.keySet();
            for (String taskName : taskNameKeys) {
                startTaskList.addAll(getPostTaskInstanceByNode(dag, taskName));
            }
            successTaskMaps.clear();
        }
        return resultList;
    }

    /**
     * submit post node 提交后一个节点
     * @param parentNodeName
     */
    private void submitPostNode(String parentNodeName) {
        List<TaskInstance> submitTaskList = null;
        if (parentNodeName == null) {
            submitTaskList = getStartSubmitTaskList();
        } else {
            submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName);
        }
        // if previous node success, post node submit 如果前一个节点成功，则提交后一个节点
        for (TaskInstance task : submitTaskList) {
            if (readyToSubmitTaskList.containsKey(task.getName())) {
                continue;
            }

            if (completeTaskList.containsKey(task.getName())) {
                logger.info("task {} has already run success", task.getName());
                continue;
            }
            if (task.getState().typeIsPause() || task.getState().typeIsCancel()) {
                logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString());
            } else {
                addTaskToStandByList(task);
            }
        }
    }

    /**
     * determine whether the dependencies of the task node are complete 确定任务节点的依赖项是否完整
     * @return
     */
    private DependResult isTaskDepsComplete(String taskName) {
        Collection<String> startNodes = dag.getBeginNode();
        // if the vertex returns true directly
        if (startNodes.contains(taskName)) {
            return DependResult.SUCCESS;
        }

        TaskNode taskNode = dag.getNode(taskName);
        List<String> depsNameList = taskNode.getDepList();
        for (String depsNode : depsNameList) {
            if (forbiddenTaskList.containsKey(depsNode)) {
                continue;
            }
            // dependencies must be fully completed 依赖关系必须全完成
            if (!completeTaskList.containsKey(depsNode)) {
                return DependResult.WAITING;
            }
            ExecutionStatus taskState = completeTaskList.get(depsNode).getState();
            if (taskState.typeIsFailure()) {
                return DependResult.FAILED;
            }
            if (taskState.typeIsPause() || taskState.typeIsCancel()) {
                return DependResult.WAITING;
            }
        }

        logger.info("taskName: {} completeDependTaskList: {}",
                taskName, Arrays.toString(completeTaskList.keySet().toArray()));
        return DependResult.SUCCESS;
    }


    /**
     * query task instance by complete state 按完成状态查询任务实例
     * @param state
     * @return
     */
    private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state) {
        List<TaskInstance> resultList = new ArrayList<>();
        Set<String> taskList = completeTaskList.keySet();
        for (String taskName : taskList) {
            TaskInstance taskInstance = completeTaskList.get(taskName);
            if (taskInstance.getState() == state) {
                resultList.add(taskInstance);
            }
        }
        return resultList;
    }

    /**
     * where there are ongoing tasks
     * @param state
     * @return
     */
    private ExecutionStatus runningState(ExecutionStatus state) {
        if(state == ExecutionStatus.READY_STOP ||
                state == ExecutionStatus.READY_PAUSE ||
                state == ExecutionStatus.WAITTING_THREAD) {
            // if the running task is not completed, the state remains unchanged
            return state;
        } else {
            return ExecutionStatus.RUNNING_EXEUTION;
        }
    }

    /**
     * exists failure task, contains submit failure、dependency failure, execute failure(retry after)
     *
     * @return
     */
    private Boolean hasFailedTask() {
        if (this.taskFailedSubmit) {
            return true;
        }
        if (this.errorTaskList.size() > 0) {
            return true;
        }
        return this.dependFailedTask.size() > 0;
    }

    /**
     * process instance failure
     *
     * @return
     */
    private Boolean processFailed() {
        if (hasFailedTask()) {
            if(processInstance.getFailureStrategy() == FailureStrategy.END) {
                return true;
            }
            if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
                return readyToSubmitTaskList.size() == 0 || activeTaskNode.size() == 0;
            }
        }
        return false;
    }

    /**
     * whether task for waiting thread
     * @return
     */
    private Boolean hasWaitingThreadTask() {
        List<TaskInstance> waitingList = getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD);
        return waitingList.size() > 0;
    }

    /**
     * prepare for pause 准备暂停
     * 1，failed retry task in the preparation queue, returns to failure directly 准备队列中的重试任务失败，直接返回失败
     * 2，exists pause task，complement not completed, pending submission of tasks, return to suspension
     * 3，success
     * @return
     */
    private ExecutionStatus processReadyPause() {
        if (hasRetryTaskInStandBy()) {
            return ExecutionStatus.FAILURE;
        }

        List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
        if (pauseList.size() > 0
                || !isComplementEnd()
                || readyToSubmitTaskList.size() > 0) {
            return ExecutionStatus.PAUSE;
        } else {
            return ExecutionStatus.SUCCESS;
        }
    }

    /**
     * generate the latest process instance status by the tasks state
     * @return
     */
    private ExecutionStatus getProcessInstanceState() {
        ProcessInstance instance = processDao.findProcessInstanceById(processInstance.getId());
        ExecutionStatus state = instance.getState();

        if (activeTaskNode.size() > 0) {
            return runningState(state);
        }
        // process failure
        if (processFailed()) {
            return ExecutionStatus.FAILURE;
        }

        // waiting thread
        if (hasWaitingThreadTask()) {
            return ExecutionStatus.WAITTING_THREAD;
        }

        // pause
        if(state == ExecutionStatus.READY_PAUSE){
            return processReadyPause();
        }

        // stop
        if(state == ExecutionStatus.READY_STOP){
            List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
            List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
            if(stopList.size() > 0 || killList.size() > 0 || !isComplementEnd()){
                return ExecutionStatus.STOP;
            }else{
                return ExecutionStatus.SUCCESS;
            }
        }

        // success
        if (state == ExecutionStatus.RUNNING_EXEUTION) {
            if (readyToSubmitTaskList.size() > 0) {
                //tasks currently pending submission, no retries, indicating that depend is waiting to complete
                return ExecutionStatus.RUNNING_EXEUTION;
            } else {
                //  if the waiting queue is empty and the status is in progress, then success
                return ExecutionStatus.SUCCESS;
            }
        }

        return state;
    }

    /**
     *  whether complement end
     * @return
     */
    private Boolean isComplementEnd() {
        if(!processInstance.isComplementData()){
            return true;
        }

        try {
            Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
            Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
            return processInstance.getScheduleTime().equals(endTime);
        } catch (Exception e) {
            logger.error("complement end failed : " + e.getMessage(),e);
            return false;
        }
    }

    /**
     * updateProcessInstance process instance state
     * after each batch of tasks is executed, the status of the process instance is updated
     */
    private void updateProcessInstanceState() {
        ExecutionStatus state = getProcessInstanceState();
        if (processInstance.getState() != state) {
            logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
                    processInstance.getId(), processInstance.getName(),
                    processInstance.getState().toString(), state.toString(),
                    processInstance.getCommandType().toString());
            processInstance.setState(state);
            ProcessInstance instance = processDao.findProcessInstanceById(processInstance.getId());
            instance.setState(state);
            instance.setProcessDefinition(processInstance.getProcessDefinition());
            processDao.updateProcessInstance(instance);
            processInstance = instance;
        }
    }

    /**
     *  get task dependency result
     * @param taskInstance
     * @return
     */
    private DependResult getDependResultForTask(TaskInstance taskInstance){
        DependResult inner = isTaskDepsComplete(taskInstance.getName());
        return inner;
    }

    /**
     *  add task to standy list
     * @param taskInstance
     */
    private void addTaskToStandByList(TaskInstance taskInstance) {
        logger.info("add task to stand by list: {}", taskInstance.getName());
        readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), taskInstance);
    }

    /**
     *  remove task from stand by list
     * @param taskInstance
     */
    private void removeTaskFromStandbyList(TaskInstance taskInstance){
        logger.info("remove task from stand by list: {}", taskInstance.getName());
        readyToSubmitTaskList.remove(taskInstance.getName());
    }

    /**
     *  has retry task in standby
     * @return
     */
    private Boolean hasRetryTaskInStandBy(){
        Set<String> taskNameSet = this.readyToSubmitTaskList.keySet();
        for(String taskName : taskNameSet){
            TaskInstance task = this.readyToSubmitTaskList.get(taskName);
            if(task.getState().typeIsFailure()){
                return true;
            }
        }
        return false;
    }

    /**
     * submit and watch the tasks, until the work flow stop 提交并观察任务，直到工作流停止
     */
    private void runProcess() {
        // submit start node
        submitPostNode(null);
        boolean sendTimeWarning = false;
        while (!processInstance.IsProcessInstanceStop()) {
            // send warning email if process time out.
            if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
                alertManager.sendProcessTimeoutAlert(
                        processInstance, processDao.findProcessDefineById(processInstance.getProcessDefinitionId()));
                sendTimeWarning = true;
            }
            Set<MasterBaseTaskExecThread> keys = activeTaskNode.keySet();
            for (MasterBaseTaskExecThread taskExecThread : keys) {
                Future<Boolean> future = activeTaskNode.get(taskExecThread);
                TaskInstance task = taskExecThread.getTaskInstance();

                if (!future.isDone()) {
                    continue;
                }
                // node monitor thread complete
                activeTaskNode.remove(taskExecThread);
                if (task == null) {
                    this.taskFailedSubmit = true;
                    continue;
                }
                logger.info("task :{}, id:{} complete, state is {} ", task.getName(), task.getId(), task.getState().toString());
                // node success, post node submit
                if (task.getState() == ExecutionStatus.SUCCESS) {
                    completeTaskList.put(task.getName(), task);
                    submitPostNode(task.getName());
                    continue;
                }
                // node fails, retry first, and then execute the failure process 节点故障，请先重试，然后执行故障处理
                if (task.getState().typeIsFailure()) {
                    if (task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
                        this.recoverToleranceFaultTaskList.add(task);
                    }
                    if (task.taskCanRetry()) {
                        addTaskToStandByList(task);
                    } else {
                        // node failure, based on failure strategy
                        errorTaskList.put(task.getName(), task);
                        completeTaskList.put(task.getName(), task);
                        if (processInstance.getFailureStrategy() == FailureStrategy.END) {
                            kill();
                        }
                    }
                    continue;
                }
                // other status stop/pause
                completeTaskList.put(task.getName(), task);
            }
            // send alert
            if (this.recoverToleranceFaultTaskList.size() > 0) {
                alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
                this.recoverToleranceFaultTaskList.clear();
            }
            // updateProcessInstance completed task status
            // failure priority is higher than pause 故障优先级高于暂停
            // if a task fails, other suspended tasks need to be reset kill 如果某个任务失败，则需要重置其他挂起的任务
            if (errorTaskList.size() > 0) {
                for(String taskName : completeTaskList.keySet()){
                    TaskInstance completeTask = completeTaskList.get(taskName);
                    if(completeTask.getState() == ExecutionStatus.PAUSE){
                        completeTask.setState(ExecutionStatus.KILL);
                        completeTaskList.put(taskName, completeTask);
                        processDao.updateTaskInstance(completeTask);
                    }
                }
            }
            if (canSubmitTaskToQueue()) {
                submitStandByTask();
            }
            try {
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(),e);
            }
            updateProcessInstanceState();
        }
        logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
    }

    /**
     * check process time out
     * @param processInstance
     * @return
     */
    private boolean checkProcessTimeOut(ProcessInstance processInstance) {
        if(processInstance.getTimeout() == 0 ){
            return false;
        }

        Date now = new Date();
        long runningTime =  DateUtils.diffMin(now, processInstance.getStartTime());

        if(runningTime > processInstance.getTimeout()){
            return true;
        }
        return false;
    }

    private boolean canSubmitTaskToQueue() {
        return OSUtils.checkResource(conf, true);
    }


    /**
     * close the ongoing tasks 关闭正在进行的任务
     */
    private void kill() {
        logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), activeTaskNode.size());
        for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : activeTaskNode.entrySet()) {
            MasterBaseTaskExecThread taskExecThread = entry.getKey();
            Future<Boolean> future = entry.getValue();
            if (!future.isDone()) {
                // record kill info
                logger.info("kill process instance, id: {}, task: {}", processInstance.getId(), taskExecThread.getTaskInstance().getId());
                //  kill node
                taskExecThread.kill();
            }
        }
    }

    /**
     * whether the retry interval is timed out 重试间隔是否超时
     * @param taskInstance
     * @return
     */
    private Boolean retryTaskIntervalOverTime(TaskInstance taskInstance) {
        if (taskInstance.getState() != ExecutionStatus.FAILURE) {
            return Boolean.TRUE;
        }
        if (taskInstance.getId() == 0 ||
                taskInstance.getMaxRetryTimes() == 0 ||
                taskInstance.getRetryInterval() == 0 ){
            return Boolean.TRUE;
        }
        Date now = new Date();
        long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime());
        // task retry does not over time, return false
        if (taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT >= failedTimeInterval) {
            return Boolean.FALSE;
        }
        return Boolean.TRUE;
    }

    /**
     * handling the list of tasks to be submitted 处理要提交的任务列表
     */
    private void submitStandByTask() {
        Set<String> readySubmitTaskNames = readyToSubmitTaskList.keySet();
        for (String readySubmitTaskName : readySubmitTaskNames) {
            TaskInstance task = readyToSubmitTaskList.get(readySubmitTaskName);
            DependResult dependResult = getDependResultForTask(task);
            if (DependResult.SUCCESS == dependResult) {
                if (retryTaskIntervalOverTime(task)) {
                    submitTaskExec(task);
                    removeTaskFromStandbyList(task);
                }
            } else if(DependResult.FAILED == dependResult) {
                // if the dependency fails, the current node is not submitted and the state changes to failure.
                // 如果依赖任务失败，则当前节点不提交，状态变为failure
                dependFailedTask.put(readySubmitTaskName, task);
                removeTaskFromStandbyList(task);
                logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);
            }
        }
    }

    private TaskInstance getRecoveryTaskInstance(String taskId){
        if(!StringUtils.isNotEmpty(taskId)){
            return null;
        }
        try {
            Integer intId = Integer.valueOf(taskId);
            TaskInstance task = processDao.findTaskInstanceById(intId);
            if(task == null){
                logger.error("start node id cannot be found: {}",  taskId);
            }else {
                return task;
            }
        }catch (Exception e){
            logger.error("get recovery task instance failed : " + e.getMessage(),e);
        }
        return null;
    }

    /**
     *  get start task instance  list
     * @param cmdParam
     * @return
     */
    private List<TaskInstance> getStartTaskInstanceList(String cmdParam){
        List<TaskInstance> instanceList = new ArrayList<>();
        Map<String, String> paramMap = JSONUtils.toMap(cmdParam);

        if(paramMap != null && paramMap.containsKey(CMDPARAM_RECOVERY_START_NODE_STRING)){
            String[] idList = paramMap.get(CMDPARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
            for(String nodeId : idList){
                TaskInstance task = getRecoveryTaskInstance(nodeId);
                if(task != null){
                    instanceList.add(task);
                }
            }
        }
        return instanceList;
    }

    /**
     *  parse "StartNodeNameList" from cmd param
     * @param cmdParam
     * @return
     */
    private List<String> parseStartNodeName(String cmdParam){
        List<String> startNodeNameList = new ArrayList<>();
        Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
        if(paramMap == null){
            return startNodeNameList;
        }
        if(paramMap.containsKey(CMDPARAM_START_NODE_NAMES)){
            startNodeNameList = Arrays.asList(paramMap.get(CMDPARAM_START_NODE_NAMES).split(Constants.COMMA));
        }
        return startNodeNameList;
    }

    /**
     *  generate start node name list from parsing command param;
     *  if "StartNodeIdList" exists in command param, return StartNodeIdList
     * @return
     */
    private List<String> getRecoveryNodeNameList(){
        List<String> recoveryNodeNameList = new ArrayList<>();
        if(recoverNodeIdList.size() > 0) {
            for (TaskInstance task : recoverNodeIdList) {
                recoveryNodeNameList.add(task.getName());
            }
        }
        return recoveryNodeNameList;
    }

    /**
     * generate flow dag
     * @param processDefinitionJson
     * @return
     * @throws Exception
     */
    public ProcessDag generateFlowDag(String processDefinitionJson,
                                      List<String> startNodeNameList,
                                      List<String> recoveryNodeNameList,
                                      TaskDependType depNodeType)throws Exception{
        return DagHelper.generateFlowDag(processDefinitionJson, startNodeNameList, recoveryNodeNameList, depNodeType);
    }

}
